简单版的 Token认证


# auth.py

from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from api.models import *


class TokenAuth(BaseAuthentication):
    def authenticate(self, request):
# token = request.data.get('token') # 从请求体中获取token参数
        token = request.META.get('HTTP_AUTHORIZATION')  # 从请求头中获取token参数
        token_obj = UserToken.objects.filter(token=token).first()
        if not token_obj:
            raise AuthenticationFailed({'code': 0, 'error': '认证失败'})
# return '', None
        return token_obj.user, token_obj

# views.py

from rest_framework.views import APIView
from rest_framework.response import Response
from api.models import *

from api.utils.auth import TokenAuth

import uuid
import datetime


# 初始化返回值
class BaseResponse:
    def __init__(self):
        self.code = ''
        self.status = False
        self.data = None
        self.msg = ''


    @property
    def dic(self):
        return self.__dict__


# 登陆,写入token
class LoginView(APIView):
    def post(self, request, *args, **kwargs):
        res = BaseResponse()
        try:
            username = request.data.get('username')
            password = request.data.get('password')
            user = UserInfo.objects.filter(username=username, password=password).first()
            if user:
                uid = uuid.uuid4()
                UserToken.objects.update_or_create(user=user, defaults={'token': uid, 'created': datetime.datetime.now()})
                res.status = True
                res.data = {
                    'token': uid
                }
                res.msg = '登陆成功'
        except Exception as e:
            res.code = 1000
            res.msg = '登陆失败'
        return Response(res.dic)


# 测试认证
class IndexView(APIView):
    authentication_classes = [TokenAuth]

    def get(self, request, *args, **kwargs):
        res = BaseResponse()
        res.status = True
        res.data = {'username': request.user.username}
        return Response(res.dic)

# models.py

from django.db import models


class UserInfo(models.Model):
"""
    用户表
    """
    username = models.CharField(verbose_name='用户名', max_length=32)
    password = models.CharField(verbose_name='密码', max_length=32)

    class Meta:
        verbose_name = "用户表"
        verbose_name_plural = verbose_name

    def __str__(self):
        return self.username


class UserToken(models.Model):
"""
    用户token表
    """
    user = models.OneToOneField(to='UserInfo')
    token = models.CharField(verbose_name='token', max_length=64)
    created = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)

    class Meta:
        verbose_name = "用户Token表"
        verbose_name_plural = verbose_name

复杂版的 Token认证


  • 功能描述: 缓存token(即: 不用每次从数据库中获取token进行比较) + 超时token

# auth.py

from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from api.models import *

from django.core.cache import cache

import datetime
import pytz


class TokenAuth(BaseAuthentication):
    def authenticate(self, request):
# token = request.data.get('token')  # 从请求体中获取token参数
        token = request.META.get('HTTP_AUTHORIZATION')  # 从请求头中获取token参数

# 从缓存中获取以token为key的缓存内容
        user = cache.get(token)
        if user:
            return user, token

# 数据库验证:如果缓存中没有token,那么就使用数据库验证
        token_obj = UserToken.objects.filter(token=token).first()
        if not token_obj:
            raise AuthenticationFailed({'code': 0, 'error': '认证失败'})
# return '', None

# 验证token是否在有效期内
        now_time = datetime.datetime.now()
        now_time = now_time.replace(tzinfo=pytz.timezone('UTC'))  # 因为.now()默认获取是不带时区的时间,而从数据库中获取的是带时区的时间所以要给.now()添加上时区
        create_time = token_obj.created
        delta = now_time - create_time  # 相差时间

        if delta < datetime.timedelta(weeks=2):  # 当前时间是否小于两周的时间
# 校验成功,写入数据库
            delta = datetime.timedelta(weeks=2) - delta  # token 剩余时间
            cache.set(token_obj.token, token_obj.user, min(delta.total_seconds(), 60 * 60 * 24 * 7))  # delta.total_seconds():获取实际的秒数,如果剩余时间超过7天,那么就使用7时间,如果没有则使用剩余的时间,这里的时间不能是写死的
        else:
            raise AuthenticationFailed({'code': 0, 'error': 'token超时'})

        return token_obj.user, token_obj

# views.py

# 这里的代码就是简单版的token认证中的 views.py 代码

# models.py

# 这里的代码就是简单版的token认证中的 models.py 代码

JsonWebToken(简称: JWT) Token认证


注意: JsonWebToken 是基于 Django 所提供的 User 表进行操作的

1. 下载模块

pip3 install djangorestframework-jwt -i https://pypi.douban.com/simple  # 使用豆瓣的镜像

2. 全局配置

# settings.py

JWT_AUTH = {
    'JWT_EXPIRATION_DELTA': datetime.timedelta(days=7),  # 过期时间,默认过期时间300秒
    'JWT_AUTH_HEADER_PREFIX': 'JWT',  # token前缀,默认值 JWT
}

3. jwt所提供的url -> 自动获取、刷新、验证 token

from rest_framework_jwt.views import (
    obtain_jwt_token,
    verify_jwt_token,
    refresh_jwt_token
)

urlpatterns = [
    url(r'^login/', obtain_jwt_token),  # 登录验证 + 获取token
    url(r'^api-token-refresh/', refresh_jwt_token),  # 刷新token
    url(r'^api-token-verify/', verify_jwt_token),  # 验证token
]

4. 手动获取、验证 token

  • 其实 obtain_jwt_token 视图类都是调用 JSONWebTokenSerializer 下的 validate 方法进行验证
  • 其实 verify_jwt_token 视图类都是调用 VerifyJSONWebTokenSerializer 下的 validate 方法进行验证

from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.authentication import BaseAuthentication
from rest_framework_jwt.serializers import (
    JSONWebTokenSerializer,
    VerifyJSONWebTokenSerializer
)


class LoginView(APIView):
    def post(self, request, *args, **kwargs):
        req_data = {
            'username': request.data.get('username'),
            'password': request.data.get('password')
        }
# 手动获取 token
        jwt = JSONWebTokenSerializer()
        user_info = jwt.validate(req_data)
        return Response({'token': user_info['token']})


class TokenAuth(BaseAuthentication):
    def authenticate(self, request):
        http_token = request.META.get('HTTP_TOKEN')
        token = {"token": http_token}
# 手动验证 token
        vjt = VerifyJSONWebTokenSerializer()
        valid_data = vjt.validate(token)
        return valid_data['user'], valid_data['token']


class IndexView(APIView):
    authentication_classes = [TokenAuth]

    def get(self, request, *args, **kwargs):
        return Response({
            'msg': 'index_info',
            'user': request.user.username,
            'token': request.auth
        })

5. 重写 validate 方法

  • 如果直接使用 obtain_jwt_token 视图类验证登陆,那么在登陆失败的时候会返回状态码为400的错误,如果想返回状态码为200的自定义登陆失败信息,那么就要重写 obtain_jwt_token -> JSONWebTokenSerializer -> validate 方法

# login_jwt.py

from rest_framework_jwt.serializers import JSONWebTokenSerializer
from rest_framework import serializers
from django.contrib.auth import authenticate
from django.utils.translation import ugettext as _

from rest_framework_jwt.settings import api_settings

jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER


class LoginJWT(JSONWebTokenSerializer):
    def validate(self, attrs):
        credentials = {
            self.username_field: attrs.get(self.username_field),
            'password': attrs.get('password')
        }

        if all(credentials.values()):
            user = authenticate(**credentials)

            if user:
                if not user.is_active:
                    msg = _('User account is disabled.')
                    raise serializers.ValidationError(msg)

                payload = jwt_payload_handler(user)

                return {
                    'token': jwt_encode_handler(payload),
                    'user': user
                }
            else:
# 原本的写法
                # msg = _('Unable to log in with provided credentials.')
                # raise serializers.ValidationError(msg)

                raise Exception('登陆失败')  # 不使用 serializers.ValidationError 的异常错误
        else:
            msg = _('Must include "{username_field}" and "password".')
            msg = msg.format(username_field=self.username_field)
            raise serializers.ValidationError(msg)

# views.py

from rest_framework.views import APIView
from rest_framework.response import Response
from api.utils.login_jwt import LoginJWT


class LoginView(APIView):
    def post(self, request, *args, **kwargs):
        try:
            req_data = {
                'username': request.data.get('username'),
                'password': request.data.get('password')
            }
# 手动获取 token
            jwt = LoginJWT()
            user_info = jwt.validate(req_data)
            return Response({'token': user_info['token']})
        except Exception as e:
            return Response({'msg': str(e)})

6. 使用 JsonWebToken 进行 token 认证的例子

# settings.py

JWT_AUTH = {
    'JWT_EXPIRATION_DELTA': datetime.timedelta(days=7),  # 过期时间,默认过期时间300秒
    'JWT_AUTH_HEADER_PREFIX': 'JWT',  # token前缀,默认值 JWT
}

# usrls.py

from rest_framework_jwt.views import obtain_jwt_token

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^login/$', obtain_jwt_token),
    url(r'^index/$', IndexView.as_view()),
]

# auth.py

from rest_framework.authentication import BaseAuthentication
from rest_framework_jwt.serializers import VerifyJSONWebTokenSerializer


class TokenAuth(BaseAuthentication):
    def authenticate(self, request):
        http_token = request.META.get('HTTP_TOKEN')
        token = {"token": http_token}
# 手动验证 token
        vjt = VerifyJSONWebTokenSerializer()
        valid_data = vjt.validate(token)
        return valid_data['user'], valid_data['token']

# views.py

from rest_framework.views import APIView
from rest_framework.response import Response
from api.utils.auth import TokenAuth


class IndexView(APIView):
    authentication_classes = [TokenAuth]

    def get(self, request, *args, **kwargs):
        return Response({
            'username': request.user.username,
            'token': request.auth,
            'msg': 'index_info'
        })